【CDK】StepFunctionsのMapステートを使って処理を並列実行させる
はじめに
先日、複数のデータに対して同じ処理を実行させたいという場面があり、Step FunctionsのMapステートを使って実装しました。とても便利だったので、復習を兼ねて記事に残しておこうと思います。
やりたいこと
下図のようなフローをCDKで作成します。
まずGetListで処理を実行させたいデータの一覧を取得します。続いてその一覧をMapステートに渡して、各データに対してSomeProcessを並列に実行させるようにします。
環境
本記事を執筆したときの環境は以下になります。
- AWS CDK 2.84.0
- Node.js v18.16.0
また、CDKのインストールやブートストラップ等は実施済みであるとします。もしまだ実施していないという場合は、公式サイトを参考にしてみてください。
AWS CDK の開始方法 - AWS Cloud Development Kit (AWS CDK) v2
やってみる
CDKプロジェクトの作成
プロジェクトフォルダを作成し、下記コマンドを実行します。ここでは言語としてTypeScriptを選択します。
cdk init --language typescript
プロジェクトフォルダ内に必要なファイルが作成されます。
Lambda関数用コードの作成
まずはGetList
とSomeProcess
の2つのLambda関数用コードを作成します。
プロジェクトフォルダ直下にlambda
というフォルダを作成し、get-list.ts
とsome-process.ts
を作成します。
get-list.ts
のコードは以下のようにしてみました。
interface Person { name: string age: number address: string } export const handler = async (): Promise<Person[]> => { try { console.log("start get-list.ts") // 並列に処理したい各データ const data = [ { name: 'Tanaka', age: 20, address: 'Tokyo', }, { name: 'Mizuno', age: 30, address: 'Osaka', }, { name: 'Yamada', age: 25, address: 'Nagoya', }, { name: 'Sato', age: 28, address: 'Fukuoka', }, ] return data } catch (error) { throw error } }
DBか何かから人物のデータ一覧を取得して、それぞれのデータに対して何か並列に処理を行うというイメージです。Mapステートに渡すために、データを配列形式で返します。
some-process.ts
のコードは次のようにしてみました。
interface ProcessEvent { param: { name: string age: number } } export const handler = async (event: ProcessEvent): Promise<void> => { try { console.log(`start some-process.ts name: ${event.param.name} age: ${event.param.age}`) // 時間がかかる処理 await sleep(5000) console.log(`end some-process.ts name: ${event.param.name} age: ${event.param.age}`) } catch (error) { throw error } } const sleep = (msec: number) => new Promise((resolve) => setTimeout(resolve, msec))
このLambda関数は人物のデータを受け取って、何か時間のかかる処理を行うというイメージです。引数としてProcessEvent
型の値を受け取るように定義しています。
Step Functionsの作成
続いてStep Functionsのリソースを作っていきます。
lib
フォルダ配下に自動的に作成されたts
ファイルがあると思いますので、そのファイルを次のように編集します。
import * as cdk from 'aws-cdk-lib'; import { Duration } from 'aws-cdk-lib'; import { Runtime } from 'aws-cdk-lib/aws-lambda'; import { NodejsFunction } from 'aws-cdk-lib/aws-lambda-nodejs'; import { LambdaInvoke } from 'aws-cdk-lib/aws-stepfunctions-tasks'; import { LogLevel, StateMachine, Map } from 'aws-cdk-lib/aws-stepfunctions' import { Construct } from 'constructs'; import { LogGroup, RetentionDays } from 'aws-cdk-lib/aws-logs'; // import * as sqs from 'aws-cdk-lib/aws-sqs'; export class CdkStepFunctionsMapStack extends cdk.Stack { constructor(scope: Construct, id: string, props?: cdk.StackProps) { super(scope, id, props); // データ一覧を取得するLambda関数 const getListLambda = new NodejsFunction( this, 'GetListLambda', { entry: './lambda/get-list.ts', handler: 'handler', timeout: Duration.minutes(10), memorySize: 256, runtime: Runtime.NODEJS_18_X, } ) // 各データに対して処理するLambda関数 const someProcessLambda = new NodejsFunction( this, 'SomeProcessLambda', { entry: './lambda/some-process.ts', handler: 'handler', timeout: Duration.minutes(10), memorySize: 256, runtime: Runtime.NODEJS_18_X, } ) // データ一覧を取得するLambda関数を実行するプロセス const getListProcessState = new LambdaInvoke( this, 'getListProcessState', { lambdaFunction: getListLambda, } ) // 各データに対して処理するLambda関数を実行するプロセス const someProcessProcessState = new LambdaInvoke( this, 'someProcessProcessState', { lambdaFunction: someProcessLambda, } ) // 人物一覧を分割するためのMapステート const mapPersonsProcessState = new Map(this, 'mapPersons', { itemsPath: '$.Payload', maxConcurrency: 5, resultPath: '$.mapOutput', parameters: { param: { 'name.$': '$$.Map.Item.Value.name', 'age.$': '$$.Map.Item.Value.age', }, }, }) // Map関数のイテレータを指定 mapPersonsProcessState.iterator(someProcessProcessState) // リトライ設定 getListProcessState.addRetry({ interval: Duration.seconds(10), maxAttempts: 2, backoffRate: 2, }) someProcessProcessState.addRetry({ interval: Duration.seconds(10), maxAttempts: 2, backoffRate: 2, }) // Step Functionsを作成 const sampleStateMachine = new StateMachine( this, 'sampleStateMachine', { stateMachineName: `sampleStateMachine`, definition: getListProcessState.next(mapPersonsProcessState), logs: { level: LogLevel.ALL, destination: new LogGroup( this, 'SampleStateMachineLogGroup', { retention: RetentionDays.ONE_WEEK, } ), }, } ) } }
まず、NodejsFunction
で作成したget-list.ts
とsome-process.ts
をLambda関数化します。そして、LambdaInvoke
でStep FunctionsからLambda関数を呼び出すためのアクションを作成します。
MapステートのitemPath
は、処理したい配列が格納されているJSONパスを指定します。今回の場合、getList
からの出力は以下のようなJSONになります。
{ "ExecutedVersion": "$LATEST", "Payload": [ { "name": "Tanaka", "age": 20, "address": "Tokyo" }, { "name": "Mizuno", "age": 30, "address": "Osaka" }, { "name": "Yamada", "age": 25, "address": "Nagoya" }, { "name": "Sato", "age": 28, "address": "Fukuoka" } ], (以下略)
$
はオブジェクトそのものを指すので、処理したい配列が格納されているパスは$.Payload
となります。
また、someProcessは引数として以下のオブジェクト型を要求していました。
param: { name: string age: number }
そこで、Mapステートのparameters
を指定して、この型に合致する値が渡されるようにしています。$$.Map.Item.Value
にはMapステートで分割された配列の各要素、つまり
{ "name": "Tanaka", "age": 20, "address": "Tokyo" }
のような値が入っています。そこで、そのうちのname
とage
を抜き出して指定しています。
Mapステートの同時実行数を5に設定しているので、配列のうち5つずつ並列に処理されることになります。
そして、Mapステートで分割された各要素に対して実行させる処理として、someProcess
を設定しています。
Step Functionsを作成する際には、まずgetList
を呼び出し、その次にMapステートが実行されるように定義しています。MapステートのイテレータとしてsomeProcess
を呼ぶように既に指定しているので、これだけの定義で大丈夫です。
動作確認
CDKをデプロイします。
cdk deploy
デプロイの途中でこのようなメッセージが表示された場合、yを入力します。
スタックのARNとTotal timeが表示されればデプロイ完了です。
AWSマネジメントコンソールでStep Functionsのページを開きます。sampleStateMachine
が作成されています。
sampleStateMachineをクリックして、ページ下部の「実行を開始」をクリックします。
このようなウインドウが表示されますが、最初に実行されるgetList
は特に入力はあってもなくても関係がないので、このままでOKです。
実行するとこのように進行状況が図で表示されます。
数秒待つと緑色に変わり、処理が成功します。
どのような処理が行われたかは「イベント」を見るとわかります。
上記のイベントのうち、MapStateEntered
イベントを開いてみると、以下のようにイベントが定義されています。getList
で取得した人物のデータが配列で渡されています。
{ "name": "mapPersons", "input": { "ExecutedVersion": "$LATEST", "Payload": [ { "name": "Tanaka", "age": 20, "address": "Tokyo" }, { "name": "Mizuno", "age": 30, "address": "Osaka" }, { "name": "Yamada", "age": 25, "address": "Nagoya" }, { "name": "Sato", "age": 28, "address": "Fukuoka" } ], (以下略)
そして、someProcessProcessState
イベントを見てみると、配列の要素の一つがインプットになっていることがわかります。
開始から終了までの時間を見てみると、6秒ほどでした。someProcess
では5秒待つ処理を入れており、順次実行されるとすると20秒かかるはずなので、並列に実行されたことがわかります。
おわりに
同じ処理を複数のデータに対して並列に実行させたいというケースはよくあると思います。そういうときにStep Functionsを使って簡単に実装できるので便利だと思いました。
この記事がどなたかの参考になれば幸いです。